In [14]:
# Initialization Spark in Python
from pyspark import SparkContext
from pyspark import SparkFiles
sc = SparkContext("local", "Additional opportunities Spark")

Accumulators


In [50]:
file = sc.textFile("log.txt")

In [51]:
blankLines = sc.accumulator(0)

In [52]:
def extractCallSigns(line):
    global blankLines
    if (line == ""):
        blankLines += 1
    return line.split(" ")

In [53]:
callSigns = file.flatMap(extractCallSigns)

In [54]:
callSigns.count()
print "Blank lines: %d" % blankLines.value


Blank lines: 4

Broadcast variables


In [ ]:
singPrefixes = sc.broadcast(<data>)

def processSignCount(sign_count, signPrefixes):
    country = lookupCountry(sign_count[0], signPrefixes.value)
    count = sign_count[1]
    return (country, count)

countryContactCounts = (
    contactCounts.map(processSignCount).reduceByKey(lambda x, y: x + y))

R (language)

Need install R

docker exec -it spark-notebook apt-get install r-base
docker exec -it spark-notebook R
> install.packages('Imap')
> q()

In [2]:
contactsContactList = sc.parallelize([23,34,33,23])

In [3]:
distScript = "finddistance.R"
distScriptName = "finddistance.R"
sc.addFile(distScript)

In [4]:
def hasDistInfo(call):
    requiredFields = ["mylat", "mylong", "contactlat", "contactlong"]
    return all(map(lambda f: call[f], requiredFields))

In [5]:
def formatCall(call):
    return "{0},{1},{2},{3}".format(
        call["mylat"], call["mylong"], call["contactlat"], call["contactlong"])

In [6]:
pipeInputs = contactsContactList.values().flatMap(
    lambda calls: map(formatCall, filter(hasDistInfo, calls)))

In [7]:
distances = pipeInputs.pipe(SparkFiles.get(distScriptName))

In [ ]:
distances.collect()

Numeric operations


In [15]:
rdd = sc.parallelize([23,34,33,23])

In [16]:
rdd.count()


Out[16]:
4

In [17]:
rdd.mean()


Out[17]:
28.25

In [18]:
rdd.sum()


Out[18]:
113

In [19]:
rdd.max()


Out[19]:
34

In [20]:
rdd.min()


Out[20]:
23

In [21]:
rdd.variance()


Out[21]:
27.6875

In [22]:
rdd.sampleVariance()


Out[22]:
36.916666666666664

In [23]:
rdd.stdev()


Out[23]:
5.2618912949622967

In [24]:
rdd.sampleStdev()


Out[24]:
6.0759087111860612

In [ ]: